Оператор exhaustMap

exhaustMap - это оператор высшего порядка (Higher-Order Observable) в RxJS, который используется для преобразования элементов одного потока данных (Observable) в другой поток данных. Он применяет функцию к каждому элементу и возвращает новый Observable, который игнорирует все внутренние Observable, пока не завершится текущий внутренний Observable.

Давайте разберемся с примером кода и объясним каждую деталь:

import { interval } from 'rxjs';
import { exhaustMap, take } from 'rxjs/operators';

// Исходный поток данных
const source$ = interval(1000).pipe(take(3));

// Функция-преобразователь, принимающая элементы и возвращающая внутренний Observable
const transform = (value: number) => {
  // Создаем внутренний Observable с таймером
  const inner$ = interval(500).pipe(take(3));
  return inner$;
};

// Применяем exhaustMap для каждого элемента и игнорируем новые внутренние Observable, пока не завершится текущий внутренний Observable
const result$ = source$.pipe(
  exhaustMap(transform)
);

// Подписываемся на результат
result$.subscribe(result => console.log(result));

В этом примере у нас есть исходный поток данных source$, который эмитит значения каждую секунду (0, 1, 2) и завершается после трех значений. Затем у нас есть функция-преобразователь transform, которая принимает каждый элемент исходного потока и возвращает внутренний Observable inner$, содержащий значения, эмитимые каждые 500 миллисекунды (0, 1, 2) и завершающийся после трех значений.

С помощью оператора exhaustMap мы применяем функцию-преобразователь к каждому элементу исходного потока. Однако, если внутренний Observable уже активен (не завершился), новые внутренние Observable игнорируются до тех пор, пока текущий внутренний Observable не завершится. Таким образом, если новое значение приходит, пока предыдущий внутренний Observable все еще активен, оно игнорируется.

Результат подписки на result$ выводится в консоль: 0, 1, 2.

В данном случае, когда первое значение 0 из исходного потока приходит, exhaustMap применяет функцию-преобразователь transform, которая создает внутренни

й Observable inner$. Этот внутренний Observable эмитит значения 0, 1, 2 каждые 500 миллисекунд. Когда первый внутренний Observable завершается, exhaustMap ждет следующего значения из исходного потока и повторяет процесс.

Таким образом, exhaustMap позволяет нам контролировать последовательность исходного потока данных и игнорировать новые значения, пока текущий внутренний Observable не завершится.